<?php
// +----------------------------------------------------------------------
// | PHP 进程控制
// +----------------------------------------------------------------------
// | 非阻塞: 为让 PHP 在后端处理长时间任务时不阻塞，快速响应页面请求
// | 1. 使用 fastcgi_finish_request()
// | 2. 使用 fsockopen(),cURL 使用http
// | 3. 使用 popen()/proc_open()
// | 4. 使用 Swoole/workerman 扩展 多进程基于proc_open
// | 5. 使用 redis/memcache 缓存和队列
// | 6. 使用 pcntl扩展 或 spatie/async - 基于 pcntl 封装的扩展包
// | 7. 使用 parallel扩展
// | 异步: 发出请求之后，即返回客户端，销毁进程，而把剩余的工作交由其他进程慢慢做去。
// | 缺点: 并发非常多时会产生很多子进程，当达到apache的连接限制数时，就会挂掉
// +----------------------------------------------------------------------
// | Author: wm
// +----------------------------------------------------------------------

trait Async
{
    /**
     * 异步任务
     * @param callable $func   要异步执行的函数
     * @param mixed ...$params 要传递给$func的参数
     * @return int
     */
    public static function task(callable $func, ...$params)
    {
        // windows不支持
        if (strtoupper(substr(PHP_OS, 0, 3)) === 'WIN') {
            exit('pcntl extension is not support for windows');
        }

        // Linux php 去掉 pcntl相关 禁用函数
        if (!extension_loaded('pcntl')) {
            exit('pcntl extension is not install');
        }

        // 命令模式执行 PHP_SAPI/php_sapi_name()
        if (!preg_match("/cli/i", php_sapi_name())) {
            exit('pcntl extension is not install');
        }

        pcntl_signal(SIGCHLD, SIG_IGN); // 安装监听信号
        $pid = pcntl_fork(); // 生成一个线程
        if ($pid == -1) {
            exit('could not fork for client');
        } else if ($pid) {
            // 父进程 (pid大于0为父进程，pid为子进程的id)
            return $pid;
        } else {
            // 子进程 (pid为0则为子进程) 进程互相独立
            try {
                // 执行用户函数
                call_user_func_array($func, $params);
            } finally {
                // 执行完后杀死进程
                posix_kill(posix_getpid(), SIGKILL);
                exit(0);// 一定要注意退出子进程,否则pcntl_fork() 会被子进程再fork,带来处理上的影响。
            }
        }
    }

    /**
     * 打开进程文件指针 (单项)
     * 传递少量参数
     * @param string $cmd  命令
     */
    public static function open_request($cmd)
    {
        // 加入重定向以得到标准错误输出 stderr。
        $handle = popen($cmd, 'r');
        //echo "'$handle'; " . gettype($handle) . "\n";
        $read = fread($handle, 2096);
        echo $read;
        pclose($handle);
    }

    /**
     * 执行一个命令，并且打开用来输入/输出的文件指针 (双向)
     * 新建子进程使用管道通信非阻塞执行PHP脚本
     * @param string $cmd    命令
     * @param array  $params 参数
     * @return array
     */
    public static function proc_open_request($cmd, $params = [])
    {
        $descriptor_spec = array(
            0 => array("pipe", "r"),  // 标准输入，子进程从此管道中读取数据
            1 => array("pipe", "w"),  // 标准输出，子进程向此管道中写入数据
            2 => array("pipe", "w")   // 2 => array("file", "/tem/error-output.txt", "a") // 标准错误，写入到一个文件
        );
        //$cwd = _NS_API_DIR_; //当前PHP进程的工作目录 '/tmp'
        //$env = array('foo' => 'bar');
        $process = proc_open($cmd, $descriptor_spec, $pipes);//, $cwd, $env

        if (is_resource($process)) {
            // $pipes 现在看起来是这样的：
            // 0 => 可以向子进程标准输入写入的句柄 STDIN, 往里面写入内容
            // 1 => 可以从子进程标准输出读取的句柄 STDOUT,子进程的输出（echo，print_r,var_dump()等等）
            // 错误输出将被追加到文件 /tmp/error-output.txt
            // stdin
            $stdin = serialize($params);
            fwrite($pipes[0], $stdin); //把参数传给脚本

            //$n = 10;
            //while($n--) {
            //    fwrite($pipes[0], "hello #$n \n");
            //    echo fread($pipes[1], 8192);
            //}
            //fclose($pipes[0]);
            //proc_close($process);

            fclose($pipes[0]);
            // 事件驱动(脚本结束事件),异步回调
            //register_shutdown_function(function() use($pipes, $process) {
            //stdout
            $stdout = stream_get_contents($pipes[1]);
            fclose($pipes[1]);
            //stderr
            $stderr = stream_get_contents($pipes[2]);
            fclose($pipes[2]);
            //切记：在调用 proc_close 之前关闭所有的管道以避免死锁。
            $status = proc_close($process);//返回进程的终止状态码,如果发生错则返回-1
            return array(
                'code' => $status,
                'msg'  => $stderr,
                'data' => $stdout,
            );
            //});
        }
        return ['code' => 1, 'msg'  => 'Error'];
    }

    /**
     * 打开一个网络连接 [和curl一样非阻塞多进程模拟多线程]
     * @param string  $url      请求地址
     * @param array   $data     请求数据
     * @param string  $method   请求方式
     * @param int     $timout   超时时长
     * @param boolean $blocking 阻塞模式
     * @return mixed
     */
    public static function sock_request($url, $data = [], $method = 'GET', $timout = 1, $blocking = false)
    {
        $info         = parse_url($url);
        $info['port'] = $info['port'] ?? 80;
        $info["host"] = $info["host"] ?? 'localhost';
        $info["host"] = $info["scheme"] == 'https' ? 'ssl://' . $info["host"] : $info["host"];
        $data         = $data ? http_build_query($data) : $info['query'];
        $fp           = fsockopen($info["host"], $info['port'], $err_no, $err_str, $timout);
        if (!$fp) {
            return ['code' => $err_no, 'msg' => $err_str];
        } else {
            stream_set_blocking($fp, $blocking);// 阻塞模式/非阻塞模式
            stream_set_timeout($fp, $timout);   // 设置超时时间
            $out = $method . " " . $info['path'] . "?" . $data . " HTTP/1.1\r\n";
            $out .= "Host: " . $info['host'] . "\r\n";
            if ($method == 'POST') {
                $out .= "Content-type:application/x-www-form-urlencoded\r\n";
                $out .= "Content-length:" . strlen($data) . "\r\n";
            }
            $out .= "Connection: Close\r\n\r\n";// 长连接关闭
            if ($method == 'POST') {
                $out .= $data;
            }
            fwrite($fp, $out);
            // 响应结果
            if ($blocking) {
                $response = '';
                while (!feof($fp)) {
                    $line     = fread($fp, 4096);
                    $response .= $line;
                }
                $pos      = strpos($response, "\r\n\r\n");
                $response = substr($response, $pos + 4);
            } else {
                usleep(1000); // 如果没有延时，可能在nginx服务器上就无法执行成功
                $response = ['code' => 0, 'msg' => 'Success'];
            }
            fclose($fp);
            return $response;
        }
    }

    /**
     * curl普通异步请求[不等待结果] 交给另一个脚本处理
     * @param string $url  接受curl请求的地址
     * @param $data
     * @return bool|string
     */
    public function curl_request($url,$data)
    {
        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, $url);

        curl_setopt($ch, CURLOPT_POST, 1);
        curl_setopt($ch, CURLOPT_HTTPHEADER, Array("Content-Type:application/json; charset=utf-8"));

        curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));//post方式数据为json格式
        curl_setopt($ch, CURLOPT_TIMEOUT, 1);//设置超时时间为1s

        $result = curl_exec($ch);
        curl_close($ch);
        return $result;
    }

    /**
     * 并发请求,类似一个进程实现了多个线程的功能
     * 异步请求
     * @param array    $urls            请求地址
     * @param Callback $callback        回调函数
     * @param array    $custom_options  curl其他参数 [CURLOPT_TIMEOUT => 1] 不等待返回返回值,交给其他页面执行
     * @return array
     */
    public static function curl_multi_request($urls, $callback = null, $custom_options = null)
    {
        $response = [];
        if (empty($urls)) {
            return $response;
        }

        // 创建批处理cURL句柄
        $mh   = curl_multi_init();

        // 增加curl 其他选项
        $std_options = [
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_HEADER         => 0,
            //CURLOPT_FOLLOWLOCATION => true,// 跟随location字段（如果有的话），递归直到真实数据返回回来
            //CURLOPT_MAXREDIRS      => 5,// 跟随location字段（如果有的话），递归直到真实数据返回回来
        ];
        $options = ($custom_options) ? ($std_options + $custom_options) : $std_options;

        $map = [];
        foreach($urls as $url){
            $ch                   = curl_init();
            $options[CURLOPT_URL] = $url;
            curl_setopt_array($ch, $options);
            curl_multi_add_handle($mh, $ch);// 增加句柄
            $map[strval($ch)]     = $url;
        }

        $active = null;
        // 执行批处理句柄
        do {
            if (($mrc = curl_multi_exec($mh, $active)) != CURLM_CALL_MULTI_PERFORM) {
                if ($mrc != CURLM_OK) { break; } //如果没有准备就绪，就再次调用curl_multi_exec
                // 获取http返回的结果
                while ($done = curl_multi_info_read($mh)) {
                    $info = curl_getinfo($done["handle"]);
                    $error = curl_error($done["handle"]);
                    $result = curl_multi_getcontent($done["handle"]);
                    $url = $map[strval($done["handle"])];
                    $rtn = compact('info', 'error', 'result', 'url');
                    if (trim($callback)) {
                        $callback($rtn);
                    }
                    $response[$url] = $rtn;

                    // 将$mh中的句柄移除
                    curl_multi_remove_handle($mh, $done['handle']);
                    curl_close($done['handle']);
                    //如果仍然有未处理完毕的句柄，那么就select
                    if ($active) {
                        curl_multi_select($mh, 1); //此处会导致阻塞大概1秒。
                    }
                }
            }
        } while ($active);
        // 关闭全部句柄
        curl_multi_close($mh);
        return $response;
    }

    /**
     * 检查系统函数是否禁用
     * 两种方法可以禁用： safe_mode或 disable_functions.
     * @param string $func
     * @return bool
     */
    public static function isAvailable($func) {
        if (ini_get('safe_mode')) return false;
        // 获取禁用函数
        $disabled = ini_get('disable_functions');
        if ($disabled) {
            $disabled = explode(',', $disabled);
            $disabled = array_map('trim', $disabled);
            return !in_array($func, $disabled);
        }
        return true;
    }
}

function async_test1()
{
    // 需要异步执行的函数
    $func = function($param) {
        // 处理逻辑
        $str = implode(',', $param);

        // 打印结果
        echo $str . "\n";

        // 休眠1秒
        sleep(1);
    };

    // 同步执行的操作，先于Async::task执行
    $arr = [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15];//假设很多
    $arr = array_chunk($arr, 4, TRUE);
    for ($i = 0; $i < 4; $i++) {
        // 异步执行函数
        Async::task($func, $arr[$i]);
    }
    echo "End";
}

function async_test2()
{
    Async::open_request('php D:\wwwroot\warehouse\api\read_stdin.php  -a value -b a,b,c,d,e,f &');
    //read_stdin.php 内容如下:
    //PHP命令行脚本接收传入参数的方式
    //1) 使用$argv or $argc参数接收
    //var_dump($argv);// 传递给脚本的参数数组 >php test.php a b

    //2) 使用getopt函数
    var_dump(getopt('a:b:'));// 从命令行参数列表中获取选项 >php  test.php -a 123 -b 456

    //3) 命令行输入参数 php test.php --dir="./tmp" --cmd="echo '1'"
    // __construct 中提示用户输入 解析命令行参数，有一个:号表示必填项
    $opts = getopt('', ['dir:', 'cmd:']);
    if (!$opts) {
        echo 'Invalid parameter', PHP_EOL;
        exit;
    }
    if (empty($opts['dir'])) {
        echo '--dir Required', PHP_EOL;
        exit;
    }
    if (empty($opts['cmd'])) {
        echo '--cmd Required', PHP_EOL;
        exit;
    }
}

function async_test3()
{
    Async::proc_open_request('php D:\wwwroot\warehouse\api\read_stdin.php', ['name'=>'wm']);

    // read_stdin.php 内容如下:
    // while(true) {
    //    $line = fgets(STDIN);// getmypid(); 子进程
    //    if ($line) {
    //        //unserialize($line);
    //        echo $line;
    //    } else {
    //        break;
    //    }
    //}
}

function async_test4()
{
    echo '开始时间是：'.date("h:i:sa");

    $urls=[];
    for ($i=0; $i <100 ; $i++) {
        $urls[]="http://www.downxia.com/downinfo/2315".$i.".html";
    }

    function deal($data){
        if ($data["error"] == '') {
            print_r($data["info"]);
        } else {
            echo $data["url"]." -- ".$data["error"]."\n";
        }
    }

    Async::curl_multi_request($urls,'deal');
    echo '结束时间是：'. date("h:i:sa");
}

